package com.skyline.courier.mq.provider.rabbitmq;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;

/**
 * RabbitMQ的Connection工厂，用于创建{@link Connection}对象
 * 
 * @author wuqh
 * 
 */
public class RabbitConnectionFactory {
	private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConnectionFactory.class);
	private static final int RETRY_TIMES = 10;

	// spring injected
	private ConnectionFactory connectionFactory;
	private String hosts;
	private ShutdownListener[] shutdownListeners;

	private Connection connection;
	private Address[] knownHosts;
	private ExecutorService executorService;
	private int retryTimes = RETRY_TIMES;

	public synchronized Connection getConnection() throws IOException {

		if (knownHosts == null) {
			initKnownHosts();
		}

		if (connectionFactory == null) {
			connectionFactory = newConnectionFactory();
		}

		int count = 0;

		while (connection == null || !connection.isOpen()) {

			if (LOGGER.isInfoEnabled()) {
				LOGGER.info("即将使用virtualhost [" + connectionFactory.getVirtualHost() + "]与主机列表[" + hosts + "]中的一个建立链接");
			}

			try {
				if (executorService == null) {
					connection = connectionFactory.newConnection(knownHosts);
				} else {
					connection = connectionFactory.newConnection(executorService, knownHosts);
				}

				addShutdownListeners();

				if (LOGGER.isInfoEnabled()) {
					LOGGER.info("开始连接[" + connection.getAddress() + ":" + connection.getPort() + "]");
				}
			} catch (Exception e) {
				LOGGER.error("连接失败，5秒钟后重试...", e);
				count++;
				if (count > retryTimes) {
					break;
				}
				try {
					TimeUnit.SECONDS.sleep(5);
				} catch (InterruptedException e1) {
					LOGGER.warn("5秒等待状态被打断（遇到InterruptedException）");
				}

			}

		}

		if (connection == null) {
			throw new MQUnreachableException("无法连接到RabbitMQ服务器");
		}

		return connection;

	}

	private void initKnownHosts() {
		knownHosts = Address.parseAddresses(hosts);
	}

	private ConnectionFactory newConnectionFactory() {
		return new ConnectionFactory();
	}

	private void addShutdownListeners() {
		if (shutdownListeners != null) {
			for (ShutdownListener shutdownListener : shutdownListeners) {
				connection.addShutdownListener(shutdownListener);
			}
		}
	}

	public void destroy() {
		if (connection != null && connection.isOpen()) {
			try {
				connection.close();
			} catch (Exception e) {
				LOGGER.warn("关闭连接失败");
			}
		}
	}

	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.connectionFactory = connectionFactory;
	}

	public void setHosts(String hosts) {
		this.hosts = hosts;
	}

	public void setShutdownListeners(ShutdownListener... shutdownListeners) {
		this.shutdownListeners = shutdownListeners;
	}

	public void setRetryTimes(int retryTimes) {
		this.retryTimes = retryTimes;
	}
}
